<!DOCTYPE html>












  


<html class="theme-next pisces use-motion" lang="zh-CN">
<head><meta name="generator" content="Hexo 3.8.0">
  <meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=2">
<meta name="theme-color" content="#222">



















  
  
  
  

  
    
    
  

  
    
      
    

    
  

  

  
    
      
    

    
  

  
    
      
    

    
  

  
    
    
    <link rel="stylesheet" href="//fonts.googleapis.com/css?family=Monda:300,300italic,400,400italic,700,700italic|Roboto Slab:300,300italic,400,400italic,700,700italic|Lobster Two:300,300italic,400,400italic,700,700italic|PT Mono:300,300italic,400,400italic,700,700italic&subset=latin,latin-ext">
  






<link rel="stylesheet" href="/lib/font-awesome/css/font-awesome.min.css?v=4.7.0">

<link rel="stylesheet" href="/css/main.css?v=7.1.2">


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=7.1.2">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=7.1.2">


  <link rel="icon" type="image/png" sizes="16x16" href="/favicon.ico?v=7.1.2">


  <link rel="mask-icon" href="/images/logo.svg?v=7.1.2" color="#222">







<script id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '/',
    scheme: 'Pisces',
    version: '7.1.2',
    sidebar: {"position":"left","display":"hide","offset":12,"onmobile":false,"dimmer":false},
    back2top: true,
    back2top_sidebar: false,
    fancybox: false,
    fastclick: false,
    lazyload: false,
    tabs: true,
    motion: {"enable":true,"async":true,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>


  




  <meta name="description" content="本文主要分享内容如下：  状态管理的基本概念； 状态的类型与使用示例； 容错机制与故障恢复；">
<meta name="keywords" content="Flink">
<meta property="og:type" content="article">
<meta property="og:title" content="Apache Flink 零基础入门教程（六）：状态管理及容错机制">
<meta property="og:url" content="https://www.dudefu.tk/Apache Flink 零基础入门（六）：状态管理及容错机制.html">
<meta property="og:site_name" content="The Future">
<meta property="og:description" content="本文主要分享内容如下：  状态管理的基本概念； 状态的类型与使用示例； 容错机制与故障恢复；">
<meta property="og:locale" content="zh-CN">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3ohveqj30th0ghdio.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3rmjyfj30tg0ggtc8.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3t5mn5j30tc0ggq4x.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3zruwaj30te0gggo2.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz47r6qyj30tg0giwgo.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz4fdn0wj30tg0gjjtt.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz4ob1guj30td0ghn1d.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5cplvyj30th0gj409.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5hhbnej30td0gfdj6.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5mh43hj30tf0gin09.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5rfwzwj30ti0gjtc9.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5vc8gbj30te0ggwhx.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6bfbeqj30tg0gin0a.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6emclxj30tf0ggwhi.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6h5adhj30tg0ghn0p.jpg">
<meta property="og:updated_time" content="2021-01-20T03:16:15.198Z">
<meta name="twitter:card" content="summary">
<meta name="twitter:title" content="Apache Flink 零基础入门教程（六）：状态管理及容错机制">
<meta name="twitter:description" content="本文主要分享内容如下：  状态管理的基本概念； 状态的类型与使用示例； 容错机制与故障恢复；">
<meta name="twitter:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3ohveqj30th0ghdio.jpg">





  
  
  <link rel="canonical" href="https://www.dudefu.tk/Apache Flink 零基础入门（六）：状态管理及容错机制">



<script id="page.configurations">
  CONFIG.page = {
    sidebar: "",
  };
</script>

  <title>Apache Flink 零基础入门教程（六）：状态管理及容错机制 | The Future</title>
  












  <noscript>
  <style>
  .use-motion .motion-element,
  .use-motion .brand,
  .use-motion .menu-item,
  .sidebar-inner,
  .use-motion .post-block,
  .use-motion .pagination,
  .use-motion .comments,
  .use-motion .post-header,
  .use-motion .post-body,
  .use-motion .collection-title { opacity: initial; }

  .use-motion .logo,
  .use-motion .site-title,
  .use-motion .site-subtitle {
    opacity: initial;
    top: initial;
  }

  .use-motion .logo-line-before i { left: initial; }
  .use-motion .logo-line-after i { right: initial; }
  </style>
</noscript>

</head>

<body itemscope="" itemtype="http://schema.org/WebPage" lang="zh-CN">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope="" itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta">
    

    <div class="custom-logo-site-title">
      <a href="/" class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">The Future</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
    
      
        <h1 class="site-subtitle" itemprop="description">Stay hungry,stay foolish.</h1>
      
    
    
  </div>

  <div class="site-nav-toggle">
    <button aria-label="切换导航栏">
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>



<nav class="site-nav">
  
    <ul id="menu" class="menu">
      
        
        
        
          
          <li class="menu-item menu-item-home">

    
    
    
      
    

    
      
    

    <a href="/" rel="section"><i class="menu-item-icon fa fa-fw fa-home"></i> <br>首页</a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-archives">

    
    
    
      
    

    
      
    

    <a href="/archives/" rel="section"><i class="menu-item-icon fa fa-fw fa-archive"></i> <br>归档<span class="badge">125</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-categories">

    
    
    
      
    

    
      
    

    <a href="/categories" rel="section"><i class="menu-item-icon fa fa-fw fa-th"></i> <br>分类<span class="badge">15</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-tags">

    
    
    
      
    

    
      
    

    <a href="/tags" rel="section"><i class="menu-item-icon fa fa-fw fa-tags"></i> <br>标签<span class="badge">63</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-something">

    
    
    
      
    

    
      
    

    <a href="/something" rel="section"><i class="menu-item-icon fa fa-fw fa-paper-plane"></i> <br>干货</a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-about">

    
    
    
      
    

    
      
    

    <a href="/about/" rel="section"><i class="menu-item-icon fa fa-fw fa-user"></i> <br>关于</a>

  </li>

      
      
        <li class="menu-item menu-item-search">
          
            <a href="javascript:;" class="popup-trigger">
          
            
              <i class="menu-item-icon fa fa-search fa-fw"></i> <br>搜索</a>
        </li>
      
    </ul>
  

  

  
    <div class="site-search">
      
  <div class="popup search-popup local-search-popup">
  <div class="local-search-header clearfix">
    <span class="search-icon">
      <i class="fa fa-search"></i>
    </span>
    <span class="popup-btn-close">
      <i class="fa fa-times-circle"></i>
    </span>
    <div class="local-search-input-wrapper">
      <input autocomplete="off" placeholder="搜索..." spellcheck="false" type="text" id="local-search-input">
    </div>
  </div>
  <div id="local-search-result"></div>
</div>



    </div>
  
</nav>



  



</div>
    </header>

    


    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          
            

          
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  
    <div class="reading-progress-bar"></div>
  

  <article class="post post-type-normal" itemscope="" itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.dudefu.tk/Apache Flink 零基础入门（六）：状态管理及容错机制.html">

    <span hidden itemprop="author" itemscope="" itemtype="http://schema.org/Person">
      <meta itemprop="name" content="Daniel X">
      <meta itemprop="description" content="專注于大数据技術，分享干货">
      <meta itemprop="image" content="https://hexoblog-1254111960.cos.ap-guangzhou.myqcloud.com/HexoBlog-tou.jpg">
    </span>

    <span hidden itemprop="publisher" itemscope="" itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="The Future">
    </span>

    
      <header class="post-header">

        
        
          <h2 class="post-title" itemprop="name headline">Apache Flink 零基础入门教程（六）：状态管理及容错机制

              
            
          </h2>
        

        <div class="post-meta">
          <span class="post-time">

            
            
            

            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              

              
                
              

              <time title="创建时间：2021-01-20 10:53:45 / 修改时间：11:16:15" itemprop="dateCreated datePublished" datetime="2021-01-20T10:53:45+08:00">2021-01-20</time>
            

            
              

              
            
          </span>

          
            <span class="post-category">
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope="" itemtype="http://schema.org/Thing"><a href="/categories/大数据/" itemprop="url" rel="index"><span itemprop="name">大数据</span></a></span>

                
                
              
            </span>
          

          
            
            
              
              <span class="post-comments-count">
                <span class="post-meta-divider">|</span>
                <span class="post-meta-item-icon">
                  <i class="fa fa-comment-o"></i>
                </span>
            
                <span class="post-meta-item-text">评论数：</span>
                <a href="/Apache Flink 零基础入门（六）：状态管理及容错机制.html#comments" itemprop="discussionUrl">
                  <span class="post-comments-count valine-comment-count" data-xid="/Apache Flink 零基础入门（六）：状态管理及容错机制.html" itemprop="commentCount"></span>
                </a>
              </span>
            
          

          
          
            <span id="/Apache Flink 零基础入门（六）：状态管理及容错机制.html" class="leancloud_visitors" data-flag-title="Apache Flink 零基础入门教程（六）：状态管理及容错机制">
              <span class="post-meta-divider">|</span>
              <span class="post-meta-item-icon">
                <i class="fa fa-eye"></i>
              </span>
              
                <span class="post-meta-item-text">阅读次数：</span>
              
                <span class="leancloud-visitors-count"></span>
            </span>
          

          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <p>本文主要分享内容如下：</p>
<ul>
<li>状态管理的基本概念；</li>
<li>状态的类型与使用示例；</li>
<li>容错机制与故障恢复；</li>
</ul>
<a id="more"></a>
<h3 id="一-状态管理的基本概念"><a href="#一-状态管理的基本概念" class="headerlink" title="一.状态管理的基本概念"></a>一.状态管理的基本概念</h3><h4 id="1-什么是状态"><a href="#1-什么是状态" class="headerlink" title="1.什么是状态"></a>1.什么是状态</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3ohveqj30th0ghdio.jpg" alt="1"></p>
<p>首先举一个无状态计算的例子：消费延迟计算。假设现在有一个消息队列，消息队列中有一个生产者持续往消费队列写入消息，多个消费者分别从消息队列中读取消息。从图上可以看出，生产者已经写入 16 条消息，Offset 停留在 15 ；有 3 个消费者，有的消费快，而有的消费慢。消费快的已经消费了 13 条数据，消费者慢的才消费了 7、8 条数据。</p>
<p>如何实时统计每个消费者落后多少条数据，如图给出了输入输出的示例。可以了解到输入的时间点有一个时间戳，生产者将消息写到了某个时间点的位置，每个消费者同一时间点分别读到了什么位置。刚才也提到了生产者写入了 15 条，消费者分别读取了 10、7、12 条。那么问题来了，怎么将生产者、消费者的进度转换为右侧示意图信息呢？</p>
<p>consumer 0 落后了 5 条，consumer 1 落后了 8 条，consumer 2 落后了 3 条，根据 Flink 的原理，此处需进行 Map 操作。Map 首先把消息读取进来，然后分别相减，即可知道每个 consumer 分别落后了几条。Map 一直往下发，则会得出最终结果。</p>
<p>大家会发现，在这种模式的计算中，无论这条输入进来多少次，输出的结果都是一样的，因为单条输入中已经包含了所需的所有信息。消费落后等于生产者减去消费者。生产者的消费在单条数据中可以得到，消费者的数据也可以在单条数据中得到，所以相同输入可以得到相同输出，这就是一个无状态的计算。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3rmjyfj30tg0ggtc8.jpg" alt="2"></p>
<p>相应的什么是有状态的计算？</p>
<p>以访问日志统计量的例子进行说明，比如当前拿到一个 Nginx 访问日志，一条日志表示一个请求，记录该请求从哪里来，访问的哪个地址，需要实时统计每个地址总共被访问了多少次，也即每个 API 被调用了多少次。可以看到下面简化的输入和输出，输入第一条是在某个时间点请求 GET 了 /api/a；第二条日志记录了某个时间点 Post /api/b ;第三条是在某个时间点 GET了一个 /api/a，总共有 3 个 Nginx 日志。从这 3 条 Nginx 日志可以看出，第一条进来输出 /api/a 被访问了一次，第二条进来输出 /api/b 被访问了一次，紧接着又进来一条访问 api/a，所以 api/a 被访问了 2 次。不同的是，两条 /api/a 的 Nginx 日志进来的数据是一样的，但输出的时候结果可能不同，第一次输出 count=1 ，第二次输出 count=2，说明相同输入可能得到不同输出。输出的结果取决于当前请求的 API 地址之前累计被访问过多少次。第一条过来累计是 0 次，count = 1，第二条过来 API 的访问已经有一次了，所以 /api/a 访问累计次数 count=2。单条数据其实仅包含当前这次访问的信息，而不包含所有的信息。要得到这个结果，还需要依赖 API 累计访问的量，即状态。</p>
<p>这个计算模式是将数据输入算子中，用来进行各种复杂的计算并输出数据。这个过程中算子会去访问之前存储在里面的状态。另外一方面，它还会把现在的数据对状态的影响实时更新，如果输入 200 条数据，最后输出就是 200 条结果。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3t5mn5j30tc0ggq4x.jpg" alt="3"></p>
<p>什么场景会用到状态呢？下面列举了常见的 4 种：</p>
<ul>
<li>去重：比如上游的系统数据可能会有重复，落到下游系统时希望把重复的数据都去掉。去重需要先了解哪些数据来过，哪些数据还没有来，也就是把所有的主键都记录下来，当一条数据到来后，能够看到在主键当中是否存在。</li>
<li>窗口计算：比如统计每分钟 Nginx 日志 API 被访问了多少次。窗口是一分钟计算一次，在窗口触发前，如 08:00 ~ 08:01 这个窗口，前59秒的数据来了需要先放入内存，即需要把这个窗口之内的数据先保留下来，等到 8:01 时一分钟后，再将整个窗口内触发的数据输出。未触发的窗口数据也是一种状态。</li>
<li>机器学习/深度学习：如训练的模型以及当前模型的参数也是一种状态，机器学习可能每次都用有一个数据集，需要在数据集上进行学习，对模型进行一个反馈。</li>
<li>访问历史数据：比如与昨天的数据进行对比，需要访问一些历史数据。如果每次从外部去读，对资源的消耗可能比较大，所以也希望把这些历史数据也放入状态中做对比。</li>
</ul>
<h4 id="2-为什么要管理状态"><a href="#2-为什么要管理状态" class="headerlink" title="2.为什么要管理状态"></a>2.为什么要管理状态</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz3zruwaj30te0gggo2.jpg" alt="4"></p>
<p>管理状态最直接的方式就是将数据都放到内存中，这也是很常见的做法。比如在做 WordCount 时，Word 作为输入，Count 作为输出。在计算的过程中把输入不断累加到 Count。</p>
<p>但对于流式作业有以下要求：</p>
<ul>
<li>7*24小时运行，高可靠；</li>
<li>数据不丢不重，恰好计算一次；</li>
<li>数据实时产出，不延迟；</li>
</ul>
<p>基于以上要求，内存的管理就会出现一些问题。由于内存的容量是有限制的。如果要做 24 小时的窗口计算，将 24 小时的数据都放到内存，可能会出现内存不足；另外，作业是 7*24，需要保障高可用，机器若出现故障或者宕机，需要考虑如何备份及从备份中去恢复，保证运行的作业不受影响；此外，考虑横向扩展，假如网站的访问量不高，统计每个 API 访问次数的程序可以用单线程去运行，但如果网站访问量突然增加，单节点无法处理全部访问数据，此时需要增加几个节点进行横向扩展，这时数据的状态如何平均分配到新增加的节点也问题之一。因此，将数据都放到内存中，并不是最合适的一种状态管理方式。</p>
<h4 id="3-理想的状态管理"><a href="#3-理想的状态管理" class="headerlink" title="3.理想的状态管理"></a>3.理想的状态管理</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz47r6qyj30tg0giwgo.jpg" alt="5"></p>
<p>最理想的状态管理需要满足易用、高效、可靠三点需求：</p>
<ul>
<li>易用，Flink 提供了丰富的数据结构、多样的状态组织形式以及简洁的扩展接口，让状态管理更加易用；</li>
<li>高效，实时作业一般需要更低的延迟，一旦出现故障，恢复速度也需要更快；当处理能力不够时，可以横向扩展，同时在处理备份时，不影响作业本身处理性能；</li>
<li>可靠，Flink 提供了状态持久化，包括不丢不重的语义以及具备自动的容错能力，比如 HA，当节点挂掉后会自动拉起，不需要人工介入。</li>
</ul>
<h3 id="二-Flink-状态的类型与使用示例"><a href="#二-Flink-状态的类型与使用示例" class="headerlink" title="二.Flink 状态的类型与使用示例"></a>二.Flink 状态的类型与使用示例</h3><h4 id="1-Managed-State-amp-Raw-State"><a href="#1-Managed-State-amp-Raw-State" class="headerlink" title="1.Managed State &amp; Raw State"></a>1.Managed State &amp; Raw State</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz4fdn0wj30tg0gjjtt.jpg" alt="6"></p>
<p>Managed State 是 Flink 自动管理的 State，而 Raw State 是原生态 State，两者的区别如下：</p>
<ul>
<li>从状态管理方式的方式来说，Managed State 由 Flink Runtime 管理，自动存储，自动恢复，在内存管理上有优化；而 Raw State 需要用户自己管理，需要自己序列化，Flink 不知道 State 中存入的数据是什么结构，只有用户自己知道，需要最终序列化为可存储的数据结构。</li>
<li>从状态数据结构来说，Managed State 支持已知的数据结构，如 Value、List、Map 等。而 Raw State只支持字节数组 ，所有状态都要转换为二进制字节数组才可以。</li>
<li>从推荐使用场景来说，Managed State 大多数情况下均可使用，而 Raw State 是当 Managed State 不够用时，比如需要自定义 Operator 时，推荐使用 Raw State。</li>
</ul>
<h4 id="2-Keyed-State-amp-Operator-State"><a href="#2-Keyed-State-amp-Operator-State" class="headerlink" title="2.Keyed State &amp; Operator State"></a>2.Keyed State &amp; Operator State</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz4ob1guj30td0ghn1d.jpg" alt="7"></p>
<p>Managed State 分为两种，一种是 Keyed State；另外一种是 Operator State。在Flink Stream模型中，Datastream 经过 keyBy 的操作可以变为 KeyedStream 。<br>每个 Key 对应一个 State，即一个 Operator 实例处理多个 Key，访问相应的多个 State，并由此就衍生了 Keyed State。Keyed State 只能用在 KeyedStream 的算子中，即在整个程序中没有 keyBy 的过程就没有办法使用 KeyedStream。</p>
<p>相比较而言，Operator State 可以用于所有算子，相对于数据源有一个更好的匹配方式，常用于 Source，例如 FlinkKafkaConsumer。相比 Keyed State，一个 Operator 实例对应一个 State，随着并发的改变，Keyed State 中，State 随着 Key 在实例间迁移，比如原来有 1 个并发，对应的 API 请求过来，/api/a 和 /api/b 都存放在这个实例当中；如果请求量变大，需要扩容，就会把 /api/a 的状态和 /api/b 的状态分别放在不同的节点。由于 Operator State 没有 Key，并发改变时需要选择状态如何重新分配。其中内置了 2 种分配方式：一种是均匀分配，另外一种是将所有 State 合并为全量 State 再分发给每个实例。</p>
<p>在访问上，Keyed State 通过 RuntimeContext 访问，这需要 Operator 是一个Rich Function。Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口。在数据结构上，Keyed State 支持的数据结构，比如 ValueState、ListState、ReducingState、AggregatingState 和 MapState；而 Operator State 支持的数据结构相对较少，如 ListState。</p>
<h4 id="3-Keyed-State-使用示例"><a href="#3-Keyed-State-使用示例" class="headerlink" title="3.Keyed State 使用示例"></a>3.Keyed State 使用示例</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5cplvyj30th0gj409.jpg" alt="8"></p>
<p>Keyed State 有很多种，如图为几种 Keyed State 之间的关系。首先 State 的子类中一级子类有 ValueState、MapState、AppendingState。AppendingState 又有一个子类 MergingState。MergingState 又分为 3 个子类分别是ListState、ReducingState、AggregatingState。这个继承关系使它们的访问方式、数据结构也存在差异。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5hhbnej30td0gfdj6.jpg" alt="9"></p>
<p>几种 Keyed State 的差异具体体现在：</p>
<ul>
<li>ValueState 存储单个值，比如 Wordcount，用 Word 当 Key，State 就是它的 Count。这里面的单个值可能是数值或者字符串，作为单个值，访问接口可能有两种，get 和 set。在 State 上体现的是 update(T) / T value()。</li>
<li>MapState 的状态数据类型是 Map，在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。</li>
<li>ListState 状态数据类型是 List，访问接口如 add、update 等。</li>
<li>ReducingState 和 AggregatingState 与 ListState 都是同一个父类，但状态数据类型上是单个值，原因在于其中的 add 方法不是把当前的元素追加到列表中，而是把当前元素直接更新进了 Reducing 的结果中。</li>
<li>AggregatingState 的区别是在访问接口，ReducingState 中 add（T）和 T get() 进去和出来的元素都是同一个类型，但在 AggregatingState 输入的 IN，输出的是 OUT。</li>
</ul>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5mh43hj30tf0gin09.jpg" alt="10"></p>
<p>下面以 ValueState 为例，来阐述一下具体如何使用，以状态机的案例来讲解 。</p>
<p>源代码地址：<span class="exturl" data-url="aHR0cHM6Ly9naXRodWIuY29tL2FwYWNoZS9mbGluay9ibG9iL21hc3Rlci9mbGluay1leGFtcGxlcy9mbGluay1leGFtcGxlcy1zdHJlYW1pbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2ZsaW5rL3N0cmVhbWluZy9leGFtcGxlcy9zdGF0ZW1hY2hpbmUvU3RhdGVNYWNoaW5lRXhhbXBsZS5qYXZh" title="https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java">https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java<i class="fa fa-external-link"></i></span></p>
<p>感兴趣的同学可直接查看完整源代码，在此截取部分。如图为 Flink 作业的主方法与主函数中的内容，前面的输入、后面的输出以及一些个性化的配置项都已去掉，仅保留了主干。</p>
<p>首先 events 是一个 DataStream，通过 env.addSource 加载数据进来，接下来有一个 DataStream 叫 alerts，先 keyby 一个 sourceAddress，然后在 flatMap 一个StateMachineMapper。StateMachineMapper 就是一个状态机，状态机指有不同的状态与状态间有不同的转换关系的结合，以买东西的过程简单举例。首先下订单，订单生成后状态为待付款，当再来一个事件状态付款成功，则事件的状态将会从待付款变为已付款，待发货。已付款，待发货的状态再来一个事件发货，订单状态将会变为配送中，配送中的状态再来一个事件签收，则该订单的状态就变为已签收。在整个过程中，随时都可以来一个事件，取消订单，无论哪个状态，一旦触发了取消订单事件最终就会将状态转移到已取消，至此状态就结束了。</p>
<p>Flink 写状态机是如何实现的？首先这是一个 RichFlatMapFunction，要用 Keyed State getRuntimeContext，getRuntimeContext 的过程中需要 RichFunction，所以需要在 open 方法中获取 currentState ，然后 getState，currentState 保存的是当前状态机上的状态。</p>
<p>如果刚下订单，那么 currentState 就是待付款状态，初始化后，currentState 就代表订单完成。订单来了后，就会走 flatMap 这个方法，在 flatMap 方法中，首先定义一个 State，从 currentState 取出，即 Value，Value 取值后先判断值是否为空，如果 sourceAddress state 是空，则说明没有被使用过，那么此状态应该为刚创建订单的初始状态，即待付款。然后赋值 state = State.Initial，注意此处的 State 是本地的变量，而不是 Flink 中管理的状态，将它的值从状态中取出。接下来在本地又会来一个变量，然后 transition，将事件对它的影响加上，刚才待付款的订单收到付款成功的事件，就会变成已付款，待发货，然后 nextState 即可算出。此外，还需要判断 State 是否合法，比如一个已签收的订单，又来一个状态叫取消订单，会发现已签收订单不能被取消，此时这个状态就会下发，订单状态为非法状态。</p>
<p>如果不是非法的状态，还要看该状态是否已经无法转换，比如这个状态变为已取消时，就不会在有其他的状态再发生了，此时就会从 state 中 clear。clear 是所有的 Flink 管理 keyed state 都有的公共方法，意味着将信息删除，如果既不是一个非法状态也不是一个结束状态，后面可能还会有更多的转换，此时需要将订单的当前状态 update ，这样就完成了 ValueState 的初始化、取值、更新以及清零，在整个过程中状态机的作用就是将非法的状态进行下发，方便下游进行处理。其他的状态也是类似的使用方式。</p>
<h3 id="三-容错机制与故障恢复"><a href="#三-容错机制与故障恢复" class="headerlink" title="三.容错机制与故障恢复"></a>三.容错机制与故障恢复</h3><h4 id="1-状态如何保存及恢复"><a href="#1-状态如何保存及恢复" class="headerlink" title="1.状态如何保存及恢复"></a>1.状态如何保存及恢复</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5rfwzwj30ti0gjtc9.jpg" alt="11"></p>
<p>Flink 状态保存主要依靠 Checkpoint 机制，Checkpoint 会定时制作分布式快照，对程序中的状态进行备份。分布式快照是如何实现的可以参考【第二课时】的内容，这里就不在阐述分布式快照具体是如何实现的。分布式快照 Checkpoint 完成后，当作业发生故障了如何去恢复？假如作业分布跑在 3 台机器上，其中一台挂了。这个时候需要把进程或者线程移到 active 的 2 台机器上，此时还需要将整个作业的所有 Task 都回滚到最后一次成功 Checkpoint 中的状态，然后从该点开始继续处理。</p>
<p>如果要从 Checkpoint 恢复，必要条件是数据源需要支持数据重新发送。Checkpoint恢复后， Flink 提供两种一致性语义，一种是恰好一次，一种是至少一次。在做 Checkpoint时，可根据 Barries 对齐来判断是恰好一次还是至少一次，如果对齐，则为恰好一次，否则没有对齐即为至少一次。如果作业是单线程处理，也就是说 Barries 是不需要对齐的；如果只有一个 Checkpoint 在做，不管什么时候从 Checkpoint 恢复，都会恢复到刚才的状态；如果有多个节点，假如一个数据的 Barries 到了，另一个 Barries 还没有来，内存中的状态如果已经存储。那么这 2 个流是不对齐的，恢复的时候其中一个流可能会有重复。</p>
<p>Checkpoint 通过代码的实现方法如下：</p>
<ul>
<li>首先从作业的运行环境 env.enableCheckpointing 传入 1000，意思是做 2 个 Checkpoint 的事件间隔为 1 秒。Checkpoint 做的越频繁，恢复时追数据就会相对减少，同时 Checkpoint 相应的也会有一些 IO 消耗。</li>
<li>接下来是设置 Checkpoint 的 model，即设置了 Exactly_Once 语义，并且需要 Barries 对齐，这样可以保证消息不会丢失也不会重复。</li>
<li>setMinPauseBetweenCheckpoints 是 2 个 Checkpoint 之间最少是要等 500ms，也就是刚做完一个 Checkpoint。比如某个 Checkpoint 做了700ms，按照原则过 300ms 应该是做下一个 Checkpoint，因为设置了 1000ms 做一次 Checkpoint 的，但是中间的等待时间比较短，不足 500ms 了，需要多等 200ms，因此以这样的方式防止 Checkpoint 太过于频繁而导致业务处理的速度下降。</li>
<li>setCheckpointTimeout 表示做 Checkpoint 多久超时，如果 Checkpoint 在 1min 之内尚未完成，说明 Checkpoint 超时失败。<br>setMaxConcurrentCheckpoints 表示同时有多少个 Checkpoint 在做快照，这个可以根据具体需求去做设置。</li>
<li>enableExternalizedCheckpoints 表示下 Cancel 时是否需要保留当前的 Checkpoint，默认 Checkpoint 会在整个作业 Cancel 时被删除。Checkpoint 是作业级别的保存点。</li>
</ul>
<p>上面讲过，除了故障恢复之外，还需要可以手动去调整并发重新分配这些状态。手动调整并发，必须要重启作业并会提示 Checkpoint 已经不存在，那么作业如何恢复数据？</p>
<p>一方面 Flink 在 Cancel 时允许在外部介质保留 Checkpoint ；另一方面，Flink 还有另外一个机制是 SavePoint。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz5vc8gbj30te0ggwhx.jpg" alt="12"></p>
<p>Savepoint 与 Checkpoint 类似，同样是把状态存储到外部介质。当作业失败时，可以从外部恢复。Savepoint 与 Checkpoint 有什么区别呢？</p>
<ul>
<li>从触发管理方式来讲，Checkpoint 由 Flink 自动触发并管理，而 Savepoint 由用户手动触发并人肉管理；</li>
<li>从用途来讲，Checkpoint 在 Task 发生异常时快速恢复，例如网络抖动或超时异常，而 Savepoint 有计划地进行备份，使作业能停止后再恢复，例如修改代码、调整并发；</li>
<li>最后从特点来讲，Checkpoint 比较轻量级，作业出现问题会自动从故障中恢复，在作业停止后默认清除；而 Savepoint 比较持久，以标准格式存储，允许代码或配置发生改变，恢复需要启动作业手动指定一个路径恢复。</li>
</ul>
<h4 id="2-可选的状态存储方式"><a href="#2-可选的状态存储方式" class="headerlink" title="2.可选的状态存储方式"></a>2.可选的状态存储方式</h4><p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6bfbeqj30tg0gin0a.jpg" alt="13"></p>
<p>Checkpoint 的存储，第一种是内存存储，即 MemoryStateBackend，构造方法是设置最大的StateSize，选择是否做异步快照，这种存储状态本身存储在 TaskManager 节点也就是执行节点内存中的，因为内存有容量限制，所以单个 State maxStateSize 默认 5 M，且需要注意 maxStateSize &lt;= akka.framesize 默认 10 M。Checkpoint 存储在 JobManager 内存中，因此总大小不超过 JobManager 的内存。推荐使用的场景为：本地测试、几乎无状态的作业，比如 ETL、JobManager 不容易挂，或挂掉影响不大的情况。不推荐在生产场景使用。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6emclxj30tf0ggwhi.jpg" alt="14"></p>
<p>另一种就是在文件系统上的 FsStateBackend ，构建方法是需要传一个文件路径和是否异步快照。State 依然在 TaskManager 内存中，但不会像 MemoryStateBackend 有 5 M 的设置上限，Checkpoint 存储在外部文件系统（本地或 HDFS），打破了总大小 Jobmanager 内存的限制。容量限制上，单 TaskManager 上 State 总量不超过它的内存，总大小不超过配置的文件系统容量。推荐使用的场景、常规使用状态的作业、例如分钟级窗口聚合或 join、需要开启HA的作业。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtz6h5adhj30tg0ghn0p.jpg" alt="15"></p>
<p>还有一种存储为 RocksDBStateBackend ，RocksDB 是一个 key/value 的内存存储系统，和其他的 key/value 一样，先将状态放到内存中，如果内存快满时，则写入到磁盘中，但需要注意 RocksDB 不支持同步的 Checkpoint，构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint，也是目前唯一增量 Checkpoint 的 Backend，意味着每次用户不需要将所有状态都写进去，将增量的改变的状态写进去即可。它的 Checkpoint 存储在外部文件系统（本地或HDFS），其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘，单 Key最大 2G，总大小不超过配置的文件系统容量即可。推荐使用的场景为：超大状态的作业，例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业。</p>
<h3 id="四-总结"><a href="#四-总结" class="headerlink" title="四.总结"></a>四.总结</h3><h4 id="1-为什么要使用状态？"><a href="#1-为什么要使用状态？" class="headerlink" title="1.为什么要使用状态？"></a>1.为什么要使用状态？</h4><p>前面提到有状态的作业要有有状态的逻辑，有状态的逻辑是因为数据之间存在关联，单条数据是没有办法把所有的信息给表现出来。所以需要通过状态来满足业务逻辑。</p>
<h4 id="2-为什么要管理状态？"><a href="#2-为什么要管理状态？" class="headerlink" title="2.为什么要管理状态？"></a>2.为什么要管理状态？</h4><p>使用了状态，为什么要管理状态？因为实时作业需要7*24不间断的运行，需要应对不可靠的因素而带来的影响。</p>
<h4 id="3-如何选择状态的类型和存储方式？"><a href="#3-如何选择状态的类型和存储方式？" class="headerlink" title="3.如何选择状态的类型和存储方式？"></a>3.如何选择状态的类型和存储方式？</h4><p>那如何选择状态的类型和存储方式？结合前面的内容，可以看到，首先是要分析清楚业务场景；比如想要做什么，状态到底大不大。比较各个方案的利弊，选择根据需求合适的状态类型和存储方式即可。</p>
<p>视频回顾：<span class="exturl" data-url="aHR0cHM6Ly93d3cuYmlsaWJpbGkuY29tL3ZpZGVvL2F2NDk3MzYxMDI/ZnJvbT1zZWFyY2gmYW1wO3NlaWQ9NTczOTUzMDQ4NjAxMTEzMjQ2OA==" title="https://www.bilibili.com/video/av49736102?from=search&amp;seid=5739530486011132468">https://www.bilibili.com/video/av49736102?from=search&amp;seid=5739530486011132468<i class="fa fa-external-link"></i></span></p>

      
    </div>

    

    
    
    

    

    
      
    
    

    

    <footer class="post-footer">
      
        
          
        
        <div class="post-tags">
          
            <a href="/tags/Flink/" rel="tag"><i class="fa fa-tag"></i> Flink</a>
          
        </div>
      

      
      
        <div class="post-widgets">
        

        

        
          
          <div class="social_share">
            
              <div>
                

<script src="//cdn.jsdelivr.net/npm/ilyabirman-likely@2/release/likely.js"></script>



<link rel="stylesheet" href="//cdn.jsdelivr.net/npm/ilyabirman-likely@2/release/likely.css">


  


<div class="likely likely-light">
	
 	 	<div class="twitter">Tweet</div>
	
 	 	<div class="facebook">Share</div>
	
 	 	<div class="linkedin">Link</div>
	
 	 	<div class="gplus">Plus</div>
	
 	 	<div class="vkontakte">Share</div>
	
 	 	<div class="odnoklassniki">Class</div>
	
 	 	<div class="telegram">Send</div>
	
 	 	<div class="whatsapp">Send</div>
	
 	 	<div class="pinterest">Pin</div>
	
</div>

              </div>
            
            
            
              <div>
                
  <div class="bdsharebuttonbox">
    <a href="#" class="bds_tsina" data-cmd="tsina" title="分享到新浪微博"></a>
    <a href="#" class="bds_douban" data-cmd="douban" title="分享到豆瓣网"></a>
    <a href="#" class="bds_sqq" data-cmd="sqq" title="分享到QQ好友"></a>
    <a href="#" class="bds_qzone" data-cmd="qzone" title="分享到QQ空间"></a>
    <a href="#" class="bds_weixin" data-cmd="weixin" title="分享到微信"></a>
    <a href="#" class="bds_tieba" data-cmd="tieba" title="分享到百度贴吧"></a>
    <a href="#" class="bds_twi" data-cmd="twi" title="分享到Twitter"></a>
    <a href="#" class="bds_fbook" data-cmd="fbook" title="分享到Facebook"></a>
    <a href="#" class="bds_more" data-cmd="more"></a>
    <a class="bds_count" data-cmd="count"></a>
  </div>
  <script>
    window._bd_share_config = {
      "common": {
        "bdText": "",
        "bdMini": "2",
        "bdMiniList": false,
        "bdPic": ""
      },
      "share": {
        "bdSize": "16",
        "bdStyle": "0"
      },
      "image": {
        "viewList": ["tsina", "douban", "sqq", "qzone", "weixin", "twi", "fbook"],
        "viewText": "分享到：",
        "viewSize": "16"
      }
    }
  </script>

<script>
  with(document)0[(getElementsByTagName('head')[0]||body).appendChild(createElement('script')).src='//bdimg.share.baidu.com/static/api/js/share.js?cdnversion='+~(-new Date()/36e5)];
</script>

              </div>
            
          </div>
        
        </div>
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/Apache Flink 零基础入门（五）：流处理核心组件Time.html" rel="next" title="Apache Flink 零基础入门（五）：流处理核心组件Time">
                <i class="fa fa-chevron-left"></i> Apache Flink 零基础入门（五）：流处理核心组件Time
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/Apache Flink 零基础入门（七）-TableAPI编程.html" rel="prev" title="Apache Flink 零基础入门（七）:Table API 编程">
                Apache Flink 零基础入门（七）:Table API 编程 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>


  </div>


          </div>
          

  
    <div class="comments" id="comments">
      <div id="lv-container" data-id="city" data-uid="MTAyMC8yOTk3My82NTM4"></div>
    </div>

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <div class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope="" itemtype="http://schema.org/Person">
            
              <img class="site-author-image" itemprop="image" src="https://hexoblog-1254111960.cos.ap-guangzhou.myqcloud.com/HexoBlog-tou.jpg" alt="Daniel X">
            
              <p class="site-author-name" itemprop="name">Daniel X</p>
              <div class="site-description motion-element" itemprop="description">專注于大数据技術，分享干货</div>
          </div>

          
            <nav class="site-state motion-element">
              
                <div class="site-state-item site-state-posts">
                
                  <a href="/archives/">
                
                    <span class="site-state-item-count">125</span>
                    <span class="site-state-item-name">日志</span>
                  </a>
                </div>
              

              
                
                
                <div class="site-state-item site-state-categories">
                  
                    
                      <a href="/categories">
                    
                  
                    
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                    <span class="site-state-item-count">15</span>
                    <span class="site-state-item-name">分类</span>
                  </a>
                </div>
              

              
                
                
                <div class="site-state-item site-state-tags">
                  
                    
                      <a href="/tags">
                    
                  
                    
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                    <span class="site-state-item-count">63</span>
                    <span class="site-state-item-name">标签</span>
                  </a>
                </div>
              
            </nav>
          

          

          

          
            <div class="links-of-author motion-element">
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly9naXRodWIuY29tL2R1ZGVmdQ==" title="GitHub &rarr; https://github.com/dudefu"><i class="fa fa-fw fa-github"></i>GitHub</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="bWFpbHRvOmR1ZGVmdUBmb3htYWlsLmNvbT9zdWJqZWN0PUhlbGxvJTIwYWdhaW4=" title="E-mail &rarr; mailto:dudefu@foxmail.com?subject=Hello%20again"><i class="fa fa-fw fa-envelope"></i>E-mail</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly93ZWliby5jb20vZHVkZWZ1" title="Weibo &rarr; https://weibo.com/dudefu"><i class="fa fa-fw fa-weibo"></i>Weibo</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly93cGEucXEuY29tL21zZ3JkP3Y9MyZ1aW49MTU3NzU3MTk1OSZzaXRlPWR1ZGVmdS5pbmZvJm1lbnU9eWVz" title="QQ &rarr; https://wpa.qq.com/msgrd?v=3&uin=1577571959&site=dudefu.info&menu=yes"><i class="fa fa-fw fa-qq"></i>QQ</span>
                </span>
              
            </div>
          

          

          
          

          
            
          
          

        </div>
      </div>

      
      <!--noindex-->
        <div class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
            
            
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-3"><a class="nav-link" href="#一-状态管理的基本概念"><span class="nav-number">1.</span> <span class="nav-text">一.状态管理的基本概念</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-什么是状态"><span class="nav-number">1.1.</span> <span class="nav-text">1.什么是状态</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-为什么要管理状态"><span class="nav-number">1.2.</span> <span class="nav-text">2.为什么要管理状态</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-理想的状态管理"><span class="nav-number">1.3.</span> <span class="nav-text">3.理想的状态管理</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#二-Flink-状态的类型与使用示例"><span class="nav-number">2.</span> <span class="nav-text">二.Flink 状态的类型与使用示例</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-Managed-State-amp-Raw-State"><span class="nav-number">2.1.</span> <span class="nav-text">1.Managed State &amp; Raw State</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-Keyed-State-amp-Operator-State"><span class="nav-number">2.2.</span> <span class="nav-text">2.Keyed State &amp; Operator State</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-Keyed-State-使用示例"><span class="nav-number">2.3.</span> <span class="nav-text">3.Keyed State 使用示例</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#三-容错机制与故障恢复"><span class="nav-number">3.</span> <span class="nav-text">三.容错机制与故障恢复</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-状态如何保存及恢复"><span class="nav-number">3.1.</span> <span class="nav-text">1.状态如何保存及恢复</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-可选的状态存储方式"><span class="nav-number">3.2.</span> <span class="nav-text">2.可选的状态存储方式</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#四-总结"><span class="nav-number">4.</span> <span class="nav-text">四.总结</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-为什么要使用状态？"><span class="nav-number">4.1.</span> <span class="nav-text">1.为什么要使用状态？</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-为什么要管理状态？"><span class="nav-number">4.2.</span> <span class="nav-text">2.为什么要管理状态？</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#3-如何选择状态的类型和存储方式？"><span class="nav-number">4.3.</span> <span class="nav-text">3.如何选择状态的类型和存储方式？</span></a></li></ol></li></ol></div>
            

          </div>
        </div>
      <!--/noindex-->
      

      

    </div>
  </aside>
  


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">  <span class="exturl" data-url="aHR0cDovL3d3dy5iZWlhbi5taWl0Lmdvdi5jbg==">粤ICP备18110871号 </span>&copy; 2017 – <span itemprop="copyrightYear">2021</span>
  <span class="with-love" id="animate">
    <i class="fa fa-spinner"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">dudefu</span>

  

  
</div>

<!--

  <div class="powered-by">由 <span class="exturl theme-link" data-url="aHR0cHM6Ly9oZXhvLmlv">Hexo</span> 强力驱动 v3.8.0</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 – <span class="exturl theme-link" data-url="aHR0cHM6Ly90aGVtZS1uZXh0Lm9yZw==">NexT.Pisces</span> v7.1.2</div>

-->



        








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
          <span id="scrollpercent"><span>0</span>%</span>
        
      </div>
    

    

    

    
  </div>

  

<script>
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>














  
    
    
      
    
  
  <script color="26,26,26" opacity="0.5" zindex="-1" count="99" src="//cdn.jsdelivr.net/gh/theme-next/theme-next-canvas-nest@1/canvas-nest.min.js"></script>









  
  
  <script id="ribbon" size="300" alpha="0.6" zindex="-1" src="/lib/canvas-ribbon/canvas-ribbon.js"></script>



  



  
  <script src="/lib/jquery/index.js?v=3.4.1"></script>

  
  <script src="/lib/velocity/velocity.min.js?v=1.2.1"></script>

  
  <script src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>

  
  <script src="/lib/reading_progress/reading_progress.js"></script>


  


  <script src="/js/utils.js?v=7.1.2"></script>

  <script src="/js/motion.js?v=7.1.2"></script>



  
  


  <script src="/js/affix.js?v=7.1.2"></script>

  <script src="/js/schemes/pisces.js?v=7.1.2"></script>



  
  <script src="/js/scrollspy.js?v=7.1.2"></script>
<script src="/js/post-details.js?v=7.1.2"></script>



  


  <script src="/js/next-boot.js?v=7.1.2"></script>


  
  <script src="/js/js.cookie.js?v=7.1.2"></script>
  <script src="/js/scroll-cookie.js?v=7.1.2"></script>


  
  <script src="/js/exturl.js?v=7.1.2"></script>


  

  
  

<script src="//cdn1.lncld.net/static/js/3.11.1/av-min.js"></script>



<script src="//unpkg.com/valine/dist/Valine.min.js"></script>

<script>
  var GUEST = ['nick', 'mail', 'link'];
  var guest = 'nick,mail,link';
  guest = guest.split(',').filter(function(item) {
    return GUEST.indexOf(item) > -1;
  });
  new Valine({
    el: '#comments',
    verify: true,
    notify: true,
    appId: '1N5rpk874DGudJw2wCL9J011-gzGzoHsz',
    appKey: '9Y83e6suJgx567wtxhKy45IN',
    placeholder: 'Just go go',
    avatar: 'mm',
    meta: guest,
    pageSize: '10' || 10,
    visitor: true,
    lang: 'zk-cn' || 'zh-cn'
  });
</script>




  
    <script>
  window.livereOptions = {
    refer: 'Apache Flink 零基础入门（六）：状态管理及容错机制.html'
  };
  (function(d, s) {
    var j, e = d.getElementsByTagName(s)[0];
    if (typeof LivereTower === 'function') { return; }
    j = d.createElement(s);
    j.src = 'https://cdn-city.livere.com/js/embed.dist.js';
    j.async = true;
    e.parentNode.insertBefore(j, e);
  })(document, 'script');
</script>

  


  
  <script>
    // Popup Window;
    var isfetched = false;
    var isXml = true;
    // Search DB path;
    var search_path = "search.xml";
    if (search_path.length === 0) {
      search_path = "search.xml";
    } else if (/json$/i.test(search_path)) {
      isXml = false;
    }
    var path = "/" + search_path;
    // monitor main search box;

    var onPopupClose = function (e) {
      $('.popup').hide();
      $('#local-search-input').val('');
      $('.search-result-list').remove();
      $('#no-result').remove();
      $(".local-search-pop-overlay").remove();
      $('body').css('overflow', '');
    }

    function proceedsearch() {
      $("body")
        .append('<div class="search-popup-overlay local-search-pop-overlay"></div>')
        .css('overflow', 'hidden');
      $('.search-popup-overlay').click(onPopupClose);
      $('.popup').toggle();
      var $localSearchInput = $('#local-search-input');
      $localSearchInput.attr("autocapitalize", "none");
      $localSearchInput.attr("autocorrect", "off");
      $localSearchInput.focus();
    }

    // search function;
    var searchFunc = function(path, search_id, content_id) {
      'use strict';

      // start loading animation
      $("body")
        .append('<div class="search-popup-overlay local-search-pop-overlay">' +
          '<div id="search-loading-icon">' +
          '<i class="fa fa-spinner fa-pulse fa-5x fa-fw"></i>' +
          '</div>' +
          '</div>')
        .css('overflow', 'hidden');
      $("#search-loading-icon").css('margin', '20% auto 0 auto').css('text-align', 'center');

      

      $.ajax({
        url: path,
        dataType: isXml ? "xml" : "json",
        async: true,
        success: function(res) {
          // get the contents from search data
          isfetched = true;
          $('.popup').detach().appendTo('.header-inner');
          var datas = isXml ? $("entry", res).map(function() {
            return {
              title: $("title", this).text(),
              content: $("content",this).text(),
              url: $("url" , this).text()
            };
          }).get() : res;
          var input = document.getElementById(search_id);
          var resultContent = document.getElementById(content_id);
          var inputEventFunction = function() {
            var searchText = input.value.trim().toLowerCase();
            var keywords = searchText.split(/[\s\-]+/);
            if (keywords.length > 1) {
              keywords.push(searchText);
            }
            var resultItems = [];
            if (searchText.length > 0) {
              // perform local searching
              datas.forEach(function(data) {
                var isMatch = false;
                var hitCount = 0;
                var searchTextCount = 0;
                var title = data.title.trim();
                var titleInLowerCase = title.toLowerCase();
                var content = data.content.trim().replace(/<[^>]+>/g,"");
                
                var contentInLowerCase = content.toLowerCase();
                var articleUrl = decodeURIComponent(data.url).replace(/\/{2,}/g, '/');
                var indexOfTitle = [];
                var indexOfContent = [];
                // only match articles with not empty titles
                if(title != '') {
                  keywords.forEach(function(keyword) {
                    function getIndexByWord(word, text, caseSensitive) {
                      var wordLen = word.length;
                      if (wordLen === 0) {
                        return [];
                      }
                      var startPosition = 0, position = [], index = [];
                      if (!caseSensitive) {
                        text = text.toLowerCase();
                        word = word.toLowerCase();
                      }
                      while ((position = text.indexOf(word, startPosition)) > -1) {
                        index.push({position: position, word: word});
                        startPosition = position + wordLen;
                      }
                      return index;
                    }

                    indexOfTitle = indexOfTitle.concat(getIndexByWord(keyword, titleInLowerCase, false));
                    indexOfContent = indexOfContent.concat(getIndexByWord(keyword, contentInLowerCase, false));
                  });
                  if (indexOfTitle.length > 0 || indexOfContent.length > 0) {
                    isMatch = true;
                    hitCount = indexOfTitle.length + indexOfContent.length;
                  }
                }

                // show search results

                if (isMatch) {
                  // sort index by position of keyword

                  [indexOfTitle, indexOfContent].forEach(function (index) {
                    index.sort(function (itemLeft, itemRight) {
                      if (itemRight.position !== itemLeft.position) {
                        return itemRight.position - itemLeft.position;
                      } else {
                        return itemLeft.word.length - itemRight.word.length;
                      }
                    });
                  });

                  // merge hits into slices

                  function mergeIntoSlice(text, start, end, index) {
                    var item = index[index.length - 1];
                    var position = item.position;
                    var word = item.word;
                    var hits = [];
                    var searchTextCountInSlice = 0;
                    while (position + word.length <= end && index.length != 0) {
                      if (word === searchText) {
                        searchTextCountInSlice++;
                      }
                      hits.push({position: position, length: word.length});
                      var wordEnd = position + word.length;

                      // move to next position of hit

                      index.pop();
                      while (index.length != 0) {
                        item = index[index.length - 1];
                        position = item.position;
                        word = item.word;
                        if (wordEnd > position) {
                          index.pop();
                        } else {
                          break;
                        }
                      }
                    }
                    searchTextCount += searchTextCountInSlice;
                    return {
                      hits: hits,
                      start: start,
                      end: end,
                      searchTextCount: searchTextCountInSlice
                    };
                  }

                  var slicesOfTitle = [];
                  if (indexOfTitle.length != 0) {
                    slicesOfTitle.push(mergeIntoSlice(title, 0, title.length, indexOfTitle));
                  }

                  var slicesOfContent = [];
                  while (indexOfContent.length != 0) {
                    var item = indexOfContent[indexOfContent.length - 1];
                    var position = item.position;
                    var word = item.word;
                    // cut out 100 characters
                    var start = position - 20;
                    var end = position + 80;
                    if(start < 0){
                      start = 0;
                    }
                    if (end < position + word.length) {
                      end = position + word.length;
                    }
                    if(end > content.length){
                      end = content.length;
                    }
                    slicesOfContent.push(mergeIntoSlice(content, start, end, indexOfContent));
                  }

                  // sort slices in content by search text's count and hits' count

                  slicesOfContent.sort(function (sliceLeft, sliceRight) {
                    if (sliceLeft.searchTextCount !== sliceRight.searchTextCount) {
                      return sliceRight.searchTextCount - sliceLeft.searchTextCount;
                    } else if (sliceLeft.hits.length !== sliceRight.hits.length) {
                      return sliceRight.hits.length - sliceLeft.hits.length;
                    } else {
                      return sliceLeft.start - sliceRight.start;
                    }
                  });

                  // select top N slices in content

                  var upperBound = parseInt('1');
                  if (upperBound >= 0) {
                    slicesOfContent = slicesOfContent.slice(0, upperBound);
                  }

                  // highlight title and content

                  function highlightKeyword(text, slice) {
                    var result = '';
                    var prevEnd = slice.start;
                    slice.hits.forEach(function (hit) {
                      result += text.substring(prevEnd, hit.position);
                      var end = hit.position + hit.length;
                      result += '<b class="search-keyword">' + text.substring(hit.position, end) + '</b>';
                      prevEnd = end;
                    });
                    result += text.substring(prevEnd, slice.end);
                    return result;
                  }

                  var resultItem = '';

                  if (slicesOfTitle.length != 0) {
                    resultItem += "<li><a href='" + articleUrl + "' class='search-result-title'>" + highlightKeyword(title, slicesOfTitle[0]) + "</a>";
                  } else {
                    resultItem += "<li><a href='" + articleUrl + "' class='search-result-title'>" + title + "</a>";
                  }

                  slicesOfContent.forEach(function (slice) {
                    resultItem += "<a href='" + articleUrl + "'>" +
                      "<p class=\"search-result\">" + highlightKeyword(content, slice) +
                      "...</p>" + "</a>";
                  });

                  resultItem += "</li>";
                  resultItems.push({
                    item: resultItem,
                    searchTextCount: searchTextCount,
                    hitCount: hitCount,
                    id: resultItems.length
                  });
                }
              })
            };
            if (keywords.length === 1 && keywords[0] === "") {
              resultContent.innerHTML = '<div id="no-result"><i class="fa fa-search fa-5x"></i></div>'
            } else if (resultItems.length === 0) {
              resultContent.innerHTML = '<div id="no-result"><i class="fa fa-frown-o fa-5x"></i></div>'
            } else {
              resultItems.sort(function (resultLeft, resultRight) {
                if (resultLeft.searchTextCount !== resultRight.searchTextCount) {
                  return resultRight.searchTextCount - resultLeft.searchTextCount;
                } else if (resultLeft.hitCount !== resultRight.hitCount) {
                  return resultRight.hitCount - resultLeft.hitCount;
                } else {
                  return resultRight.id - resultLeft.id;
                }
              });
              var searchResultList = '<ul class=\"search-result-list\">';
              resultItems.forEach(function (result) {
                searchResultList += result.item;
              })
              searchResultList += "</ul>";
              resultContent.innerHTML = searchResultList;
            }
          }

          if ('auto' === 'auto') {
            input.addEventListener('input', inputEventFunction);
          } else {
            $('.search-icon').click(inputEventFunction);
            input.addEventListener('keypress', function (event) {
              if (event.keyCode === 13) {
                inputEventFunction();
              }
            });
          }

          // remove loading animation
          $(".local-search-pop-overlay").remove();
          $('body').css('overflow', '');

          proceedsearch();
        }
      });
    }

    // handle and trigger popup window;
    $('.popup-trigger').click(function(e) {
      e.stopPropagation();
      if (isfetched === false) {
        searchFunc(path, 'local-search-input', 'local-search-result');
      } else {
        proceedsearch();
      };
    });

    $('.popup-btn-close').click(onPopupClose);
    $('.popup').click(function(e){
      e.stopPropagation();
    });
    $(document).on('keyup', function (event) {
      var shouldDismissSearchPopup = event.which === 27 &&
        $('.search-popup').is(':visible');
      if (shouldDismissSearchPopup) {
        onPopupClose();
      }
    });
  </script>





  

  
  <script src="https://www.gstatic.com/firebasejs/4.6.0/firebase.js"></script>
  <script src="https://www.gstatic.com/firebasejs/4.6.0/firebase-firestore.js"></script>
  
    <script src="https://cdnjs.cloudflare.com/ajax/libs/bluebird/3.5.1/bluebird.core.min.js"></script>
  
  <script>
    (function () {

      firebase.initializeApp({
        apiKey: '',
        projectId: ''
      })

      function getCount(doc, increaseCount) {
        //increaseCount will be false when not in article page

        return doc.get().then(function (d) {
          var count
          if (!d.exists) { //has no data, initialize count
            if (increaseCount) {
              doc.set({
                count: 1
              })
              count = 1
            }
            else {
              count = 0
            }
          }
          else { //has data
            count = d.data().count
            if (increaseCount) {
              if (!(window.localStorage && window.localStorage.getItem(title))) { //if first view this article
                doc.set({ //increase count
                  count: count + 1
                })
                count++
              }
            }
          }
          if (window.localStorage && increaseCount) { //mark as visited
            localStorage.setItem(title, true)
          }

          return count
        })
      }

      function appendCountTo(el) {
        return function (count) {
          $(el).append(
            $('<span>').addClass('post-visitors-count').append(
              $('<span>').addClass('post-meta-divider').text('|')
            ).append(
              $('<span>').addClass('post-meta-item-icon').append(
                $('<i>').addClass('fa fa-users')
              )
              ).append($('<span>').text('阅读次数 ' + count))
          )
        }
      }

      var db = firebase.firestore()
      var articles = db.collection('articles')

      //https://hexo.io/docs/variables.html
      var isPost = 'Apache Flink 零基础入门教程（六）：状态管理及容错机制'.length > 0
      var isArchive = '' === 'true'
      var isCategory = ''.length > 0
      var isTag = ''.length > 0

      if (isPost) { //is article page
        var title = 'Apache Flink 零基础入门教程（六）：状态管理及容错机制'
        var doc = articles.doc(title)

        getCount(doc, true).then(appendCountTo($('.post-meta')))
      }
      else if (!isArchive && !isCategory && !isTag) { //is index page
        var titles = [] //array to titles

        var postsstr = '' //if you have a better way to get titles of posts, please change it
        eval(postsstr)

        var promises = titles.map(function (title) {
          return articles.doc(title)
        }).map(function (doc) {
          return getCount(doc)
        })
        Promise.all(promises).then(function (counts) {
          var metas = $('.post-meta')
          counts.forEach(function (val, idx) {
            appendCountTo(metas[idx])(val)
          })
        })
      }
    })()
  </script>


  
  

  
  

  


  
<script>
if ($('body').find('div.pdf').length) {
  $.ajax({
    type: 'GET',
    url: '//cdn.jsdelivr.net/npm/pdfobject@2/pdfobject.min.js',
    dataType: 'script',
    cache: true,
    success: function() {
      $('body').find('div.pdf').each(function(i, o) {
        PDFObject.embed($(o).attr('target'), $(o), {
          pdfOpenParams: {
            navpanes: 0,
            toolbar: 0,
            statusbar: 0,
            pagemode: 'thumbs',
            view: 'FitH'
          },
          PDFJS_URL: '/lib/pdf/web/viewer.html',
          height: $(o).attr('height') || '500px'
        });
      });
    },
  });
}
</script>


  
<script>
if ($('body').find('pre.mermaid').length) {
  $.ajax({
    type: 'GET',
    url: '//cdn.jsdelivr.net/npm/mermaid@8/dist/mermaid.min.js',
    dataType: 'script',
    cache: true,
    success: function() {
      mermaid.initialize({
        theme: 'dark',
        logLevel: 3,
        flowchart: { curve: 'linear' },
        gantt: { axisFormat: '%m/%d/%Y' },
        sequence: { actorMargin: 50 }
      });
    }
  });
}
</script>


  
  <script>
    (function(){
      var bp = document.createElement('script');
      var curProtocol = window.location.protocol.split(':')[0];
      bp.src = (curProtocol === 'https') ? 'https://zz.bdstatic.com/linksubmit/push.js' : 'http://push.zhanzhang.baidu.com/push.js';
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(bp, s);
    })();
  </script>


  

  

  

  

  
  
  
  <script src="/lib/bookmark/bookmark.min.js?v=1.0"></script>
  <script>
  
    bookmark.scrollToMark('auto', "#更多");
  
  </script>


  
<script>
  $('.highlight').not('.gist .highlight').each(function(i, e) {
    var $wrap = $('<div>').addClass('highlight-wrap');
    $(e).after($wrap);
    $wrap.append($('<button>').addClass('copy-btn').append('复制').on('click', function(e) {
      var code = $(this).parent().find('.code').find('.line').map(function(i, e) {
        return $(e).text();
      }).toArray().join('\n');
      var ta = document.createElement('textarea');
      var yPosition = window.pageYOffset || document.documentElement.scrollTop;
      ta.style.top = yPosition + 'px'; // Prevent page scroll
      ta.style.position = 'absolute';
      ta.style.opacity = '0';
      ta.readOnly = true;
      ta.value = code;
      document.body.appendChild(ta);
      const selection = document.getSelection();
      const selected = selection.rangeCount > 0 ? selection.getRangeAt(0) : false;
      ta.select();
      ta.setSelectionRange(0, code.length);
      ta.readOnly = false;
      var result = document.execCommand('copy');
      
        if (result) $(this).text('复制成功');
        else $(this).text('复制失败');
      
      ta.blur(); // For iOS
      $(this).blur();
      if (selected) {
        selection.removeAllRanges();
        selection.addRange(selected);
      }
    })).on('mouseleave', function(e) {
      var $b = $(this).find('.copy-btn');
      setTimeout(function() {
        $b.text('复制');
      }, 300);
    }).append(e);
  })
</script>


  

  

</body>
</html>
